summaryrefslogtreecommitdiffstats
path: root/src/common/bounded_threadsafe_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/bounded_threadsafe_queue.h')
-rw-r--r--src/common/bounded_threadsafe_queue.h46
1 files changed, 29 insertions, 17 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index 975215863..0fb2f42d1 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -45,12 +45,12 @@ public:
}
void PopWait(T& t, std::stop_token stop_token) {
- Wait(stop_token);
+ ConsumerWait(stop_token);
Pop(t);
}
T PopWait(std::stop_token stop_token) {
- Wait(stop_token);
+ ConsumerWait(stop_token);
T t;
Pop(t);
return t;
@@ -88,9 +88,10 @@ private:
}
} else if constexpr (Mode == PushMode::Wait) {
// Wait until we have free slots to write to.
- while ((write_index - m_read_index.load()) == Capacity) {
- std::this_thread::yield();
- }
+ std::unique_lock lock{producer_cv_mutex};
+ producer_cv.wait(lock, [this, write_index] {
+ return (write_index - m_read_index.load()) < Capacity;
+ });
} else {
static_assert(Mode < PushMode::Count, "Invalid PushMode.");
}
@@ -105,8 +106,8 @@ private:
++m_write_index;
// Notify the consumer that we have pushed into the queue.
- std::scoped_lock lock{cv_mutex};
- cv.notify_one();
+ std::scoped_lock lock{consumer_cv_mutex};
+ consumer_cv.notify_one();
return true;
}
@@ -122,9 +123,10 @@ private:
}
} else if constexpr (Mode == PushMode::Wait) {
// Wait until we have free slots to write to.
- while ((write_index - m_read_index.load()) == Capacity) {
- std::this_thread::yield();
- }
+ std::unique_lock lock{producer_cv_mutex};
+ producer_cv.wait(lock, [this, write_index] {
+ return (write_index - m_read_index.load()) < Capacity;
+ });
} else {
static_assert(Mode < PushMode::Count, "Invalid PushMode.");
}
@@ -139,8 +141,8 @@ private:
++m_write_index;
// Notify the consumer that we have pushed into the queue.
- std::scoped_lock lock{cv_mutex};
- cv.notify_one();
+ std::scoped_lock lock{consumer_cv_mutex};
+ consumer_cv.notify_one();
return true;
}
@@ -161,6 +163,10 @@ private:
// Increment the read index.
++m_read_index;
+
+ // Notify the producer that we have popped off the queue.
+ std::unique_lock lock{producer_cv_mutex};
+ producer_cv.notify_one();
}
bool Pop(T& t) {
@@ -180,12 +186,16 @@ private:
// Increment the read index.
++m_read_index;
+ // Notify the producer that we have popped off the queue.
+ std::scoped_lock lock{producer_cv_mutex};
+ producer_cv.notify_one();
+
return true;
}
- void Wait(std::stop_token stop_token) {
- std::unique_lock lock{cv_mutex};
- Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); });
+ void ConsumerWait(std::stop_token stop_token) {
+ std::unique_lock lock{consumer_cv_mutex};
+ Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); });
}
alignas(128) std::atomic_size_t m_read_index{0};
@@ -193,8 +203,10 @@ private:
std::array<T, Capacity> m_data;
- std::condition_variable_any cv;
- std::mutex cv_mutex;
+ std::condition_variable_any producer_cv;
+ std::mutex producer_cv_mutex;
+ std::condition_variable_any consumer_cv;
+ std::mutex consumer_cv_mutex;
};
template <typename T, size_t Capacity = detail::DefaultCapacity>